<?php

namespace Net;

require_once "EventBaseTrait.php";

global $gReadableSocketListens;
global $gWriteableSocketListens;
$gReadableSocketListens = [];
$gWriteableSocketListens = [];

/**
 * Class Messenger 信使
 * @package App\Library\Foundation\Socket
 *
 * 事件列表:
 *  ready  处于listen状态的socket才会收到此事件
 *  recv
 *  new_connect 处于listen状态的socket才会收到此事件
 *  close
 *  connec_fail 处于listen状态的socket才会收到此事件
 *
 * before:
 *  close
 *
 */
class Connection
{
    protected $_socket;
    protected $_return;
    protected $_id;
    protected $_recvLength=1024 * 80;
    protected $_block;
    protected $_socketInfo=[];
    
    protected $_ip;
    protected $_port;
    
    static $SOCKET_ID=1;
    
    protected $_connectedBeginAt = null;
    protected $_flag = null;
    protected $_connectToInfo = [];
    protected $_connectTimeout = 0;
    protected $_isListeningSocket=false;
    
    use EventBaseTrait {
        triggerBefore as protected _triggerBefore;
        triggeron as protected _triggerOn;
    }
    
    const SHUTDOWN_READ = 0;
    const SHUTDOWN_WRITE = 1;
    const SHUTDOWN_BOTH = 2;
    
    const EVENT_TYPE_ERROR = 'error';
    const EVENT_TYPE_CONNECT = 'connect';
    const EVENT_TYPE_CLOSE = 'close';
    const EVENT_TYPE_READY = 'ready';
    const EVENT_TYPE_RECV = 'recv';
    const EVENT_TYPE_CONNECT_FAIL = 'connect_fail';
    const EVENT_TYPE_NEW_CONNECT = 'new_connect';
    
    /**
     * WScoket constructor.
     * @param null $socket
     */
    public function __construct($socket=null)
    {
        if($socket == null) {
            $this->init()->_return = null;
        } else {
            $this->_socket = $socket;
        }
        
        $this->setBlock(false);
        $this->_id = self::$SOCKET_ID ++;
    }
    
    public function getId() {
        return $this->_id;
    }
    
    public function getLastReturn() {
        return $this->_return;
    }
    
    public function setFlag($str) {
        $this->_flag = $str;
        return $this;
    }
    
    public function getFlag() {
        return $this->_flag;
    }
    
    public function __toString()
    {
        return $this->toString();
    }
    
    public function toString() {
        $sockname = $this->getpeername();
        $socknameStr = '';
        
        if(!empty($sockname)) {
            $socknameStr = "-{$sockname['address']}:{$sockname['port']}";
        }
        
        return "[{$this->_flag}-{$this->_id}$socknameStr]";
    }
    
    /**
     * 创建一个套接字
     * @param int $type
     * @param int $domain
     * @param int $protocol
     * @return $this
     */
    public function init($type=SOCK_STREAM, $domain=AF_INET, $protocol=SOL_TCP)
    {
        $this->_socketInfo = [
            'type' => $type,
            'domain' => $domain,
            'protocol' => $protocol,
        ];
        $this->_return = $this->_socket = socket_create($domain, $type, $protocol);
        return $this;
    }
    
    public function isUdp() {
        return $this->_socketInfo['protocol'] == SOL_UDP;
    }
    
    public function setSendTimeout($sec) {
        return $this->setOption(SOL_SOCKET, SO_RCVTIMEO,["sec"=>$sec, "usec"=>0]);
    }
    
    public function setRecvTimeout($sec) {
        return $this->setOption(SOL_SOCKET, SO_RCVTIMEO,["sec"=>$sec, "usec"=>0]);
    }
    
    /**
     * 套接字绑定地址
     * @param $address
     * @param int $port
     * @return $this
     */
    public function bind($address, $port=0)
    {
        $this->_return = @socket_bind($this->_socket, $address, $port);
        return $this;
    }
    
    /**
     * 监听套接字
     * @param int $backlog
     * @return $this
     */
    public function listen($backlog=0)
    {
        $this->_return = @socket_listen($this->_socket, $backlog);
        static::addReadableSocketListen($this);
        $this->_isListeningSocket = true;
        return $this;
    }
    
    /**
     * 连接到指定地址
     * @param $address
     * @param int $port
     * @return $this
     */
    public function connect($address, $port=0)
    {
        $this->triggerBeforeConnect([Event::PARAM_ADDRESS=>$address, Event::PARAM_PORT => $port]);
        $this->_connectedBeginAt = microtime(true) * 1000;
        $this->_return = @socket_connect($this->_socket, $address, $port);
        $this->_connectToInfo = [
            'address' => $address,
            'port' => $port,
        ];
     
        if(!$this->isBlockMode()) {
            if($this->isConnecting()) {
                static::addWriteableSocketListen($this);
            } else {
                $this->triggerOnConnectFail();
            }
        }
        
        return $this;
    }
    
    public function getConnectToInfo() {
        return $this->_connectToInfo;
    }
    
    public function connectionTimeConsuming() {
        return microtime(true) * 1000 - $this->_connectedBeginAt;
    }
    
    public function setConnectTimeout($sec) {
        $this->_connectTimeout = $sec*1000;
        return $this;
    }
    
    public function getConnectTimeout() {
        return $this->_connectTimeout;
    }
    
    public function isConnecting() {
        return in_array($this->getLastError(), [SOCKET_EINPROGRESS, SOCKET_EWOULDBLOCK]);
    }
    
    public function getpeername() {
        $this->_return = @socket_getpeername($this->getOriginalSocket(), $address, $port);
        
        if($address || $port) {
            return [
                'address' => $address,
                'port' => $port
            ];
        }
        
        return null;
    }
    
    public function available() {
        return is_resource($this->_socket);
    }
    
    public function getsockname() {
        if($this->getLastError()) {
            return null;
        }
        
        $this->_return = @socket_getsockname($this->getOriginalSocket(), $address, $port);
        
        if($address || $port) {
            return [
                'address' => $address,
                'port' => $port
            ];
        }
        
        return null;
    }
    
    /**
     * 接受一个套接字连接请求
     * @return self|false
     */
    public function accept()
    {
        $socket = socket_accept($this->_socket);
        
        if($socket) {
            $newSocket = new static($socket);
            static::addReadableSocketListen($newSocket);
            return $this->_return = $newSocket;
        } else {
            return $this->_return = false;
        }
    }
    
    /**
     * 关闭套接字
     * @return $this
     */
    public function close()
    {
        $this->_return = null;
        
        if($this->available()) {
            $this->triggerBeforeClose();
            $this->closeWithoutEvent();
            $this->triggerOnClose();
        }
    
        static::removeFromReadableSocketListens($this);
        static::removeFromWriteableSocketListens($this);
        
        return $this;
    }
    
    /**
     * 关闭套接字指定功能
     * @param int $how
     * @return $this
     */
    public function shutdown($how = self::SHUTDOWN_BOTH) {
        $this->_return = null;
        
        if($this->available()) {
            $this->_return = socket_shutdown($this->getOriginalSocket(), $how);
            return $this;
        }
    }
    
    /**
     * 不带事件关闭套接字
     * @return $this
     */
    public function closeWithoutEvent() {
        if($this->available()) {
            socket_close($this->_socket);
        }
    
        static::removeFromReadableSocketListens($this);
        static::removeFromWriteableSocketListens($this);
        return $this;
    }
    
    /**
     * 从套接字中读取
     * @param $call
     * @param int $type
     * @return string
     */
    public function read($call=null, $type=PHP_BINARY_READ)
    {
        if(!is_callable($call)) {
            $type = $call == null ? $type : $call;
            $allBuf = '';
    
            while($buf = socket_read($this->_socket, $this->_recvLength, $type)) {
                $allBuf .= $buf;
            }
    
            return $this->_return = $allBuf;
        }

        while($buf = socket_read($this->_socket, $this->_recvLength, $type)) {
            $call($buf);
        }

        return $this->_return = $buf;
    }
    
    
    
    /**
     * 从套接字中接收buf
     * @param callable|int $call callable或者flags
     * @param null|int $flags
     * @return null|string
     */
    public function recv($call=null, $flags=null)
    {
        if(!is_callable($call)) {
            $flags = $call == null ? $flags : $call;
            $allBuf = '';
            $buf = null;
    
            while($return = @socket_recv($this->_socket, $buf, $this->_recvLength, $flags)) {
                $allBuf .= $buf;
            }
    
            $this->_return = $return;
            return $allBuf;
        }

        while($return = @socket_recv($this->_socket, $buf, $this->_recvLength, $flags)) {
            $call($buf, $return);
        }

        return $this->_return = $return;
    }
    
    
    /**
     * 向此套接字发送数据
     * @param $buf
     * @param $len
     * @param $flags
     * @return $this
     */
    public function send($buf, $len=null, $flags=null)
    {
        $this->_return = 0;
        
        if(empty($buf)) {
            return $this;
        }
        
        if(!$this->available() || !$this->getpeername()) {
            return $this;
        }
    
        if(null == $len) {
            $totalLen = 0;
            $remainingLen  = strlen($buf);
    
            while($remainingLen > 0) {
                $sendCount = @socket_send($this->_socket, $buf, $remainingLen, $flags);
                
                if($sendCount === false) {
                    $this->triggerOnError([Event::PARAM_ERROR_METHOD => 'send']);
                    break;
                }
                
                if($sendCount == 0) {
                    break;
                }
    
                $totalLen += $sendCount;
                $buf = substr($buf, $sendCount);
                $remainingLen = strlen($buf);
            }
    
            $this->_return = $totalLen;
        } else {
            $this->_return = socket_send($this->_socket, $buf, $len, $flags);
        }
        
        return $this;
    }
    
    /**
     * 写入套接字buf
     * @param $buf
     * @param int $len
     * @return $this
     */
    public function write($buf, $len=null)
    {
        $this->_return = 0;
    
        if(empty($buf)) {
            return $this;
        }
        
        if(!$this->available() || !$this->getpeername()) {
            return $this;
        }
        
        if(null == $len) {
            $totalLen = 0;
            $remainingLen = strlen($buf);
            
            while($remainingLen > 0) {
                $sendCount = @socket_write($this->_socket, $buf, $remainingLen);
    
                if($sendCount === false) {
                    $this->triggerOnError([Event::PARAM_ERROR_METHOD => 'write']);
                    break;
                }
                
                if($sendCount == 0) {
                    break;
                }
                
                $totalLen += $sendCount;
                $buf = substr($buf, $sendCount);
                $remainingLen = strlen($buf);
            }
            
            $this->_return = $totalLen;
        } else {
            $this->_return = socket_write($this->_socket, $buf, $len);
        }
        
        return $this;
    }
    
    /**
     * 设置阻塞模式
     * @param bool|true $block
     * @return $this
     */
    public function setBlock($block = true)
    {
        $this->_block = $block;
        
        if ($block) {
            $this->_return = socket_set_block($this->_socket);
        } else {
            $this->_return = socket_set_nonblock($this->_socket);
        }
        
        return $this;
    }
    
    /**
     * 是否阻塞模式
     * @return bool
     */
    public function isBlockMode() {
        return is_bool($this->_block) ? $this->_block : true;
    }
    
    /**
     * 设置套接字扩展选项
     * @param $level
     * @param $optname
     * @param $optval
     * @return $this
     */
    public function setOption($level, $optname, $optval)
    {
        $this->_return = socket_set_option($this->_socket, $level, $optname, $optval);
        return $this;
    }
    
    /**
     * 获取套接字扩展选项
     * @param $level
     * @param $optname
     * @return mixed
     */
    public function getOption($level, $optname)
    {
        $this->_return = null;
        return socket_get_option($this->_socket, $level, $optname);
    }
    
    /**
     * 清空错误信息
     * @return $this
     */
    public function clearError()
    {
        $this->_return = null;
        socket_clear_error($this->_socket);
        return $this;
    }
    
    /**
     * 获取最后一个错误代码
     * @return int
     */
    public function getLastError()
    {
        $this->_return = null;
    
        if(!$this->available()) {
            return 1;
        }
    
        return socket_last_error($this->_socket);
    }
    
    /**
     * 获取最后一个错误消息
     * @return string
     */
    public function getLastErrorMessage()
    {
        $this->_return = null;
        return socket_strerror($this->getLastError());
    }
    
    /**
     * 获取原始socket
     * @return null
     */
    public function getOriginalSocket() {
        return $this->_socket;
    }
    
    /**
     * 监听
     * @param null $addr
     * @param null $port
     * @param int $count
     * @return $this
     */
    public function listening($addr=null, $port=null, $count=0) {
        if(!$this->_socket) {
            $this->init();
        }
        
        $this->setOption(SOL_SOCKET, SO_REUSEADDR, 1);
        
        $this->bind($addr, $port);
        
        if($this->getLastError()) {
            if(!$this->isBlockMode()) {
                $this->triggerOnError([
                    Event::PARAM_ERROR_METHOD => 'bind'
                ]);
            }
            return $this;
        }
        
        $this->_return = $this->listen($count);
        
        if($this->getLastError()) {
            if(!$this->isBlockMode()) {
                $this->triggerOnError([
                    Event::PARAM_ERROR_METHOD => 'listen'
                ]);
            }
            return $this;
        }
        
        return $this;
    }
    
    /**
     * 是否一个监听中的socket
     * @return bool
     */
    public function isListeningSocket() {
        return $this->_isListeningSocket;
    }
    
    /**
     * socket 自动分配
     */
    static public function dispatch() {
        $getOriginalSocket = function(Connection $socket) {
            return $socket->getOriginalSocket();
        };
        
        $readables = static::getReadableSocketListens();
        
        /** @var static $socket */
        foreach ($readables as $socket) {
            if($socket->isListeningSocket()) {
                $socket->triggerOnReady();
            }
        }
        
        $dispatch = function() use (
            $getOriginalSocket
        ) {
            $readables = array_map($getOriginalSocket, static::getReadableSocketListens());
            $originWriteables = array_map($getOriginalSocket, static::getWriteableSocketListens());
            $writeables = $originWriteables;
    
            if(empty($readables) && empty($writeables)) {
                return ;
            }
            
            if(!empty($writeables)) {
                @socket_select($readables, $writeables, $nouse, 1, 0);
            } else {
                @socket_select($readables, $writeables, $nouse, null);
            }
            
            foreach ($originWriteables as $originalSocket) {
                $socket = static::findSocket($originalSocket);
                
                if($socket && $socket->getConnectTimeout()) {
                    if($socket->connectionTimeConsuming() > $socket->getConnectTimeout()) {
                        $socket->closeWithoutEvent();
                        $socket->triggerOnConnectFail([
                            'error_message' => 'Connect timeout!',
                            'error_code' => 'connect_timeout'
                        ]);
                    }
                }
            }
            
            foreach ($readables as $originalSocket) {
                $socket = static::findSocket($originalSocket);
                
                if($socket) {
                    if($socket->isListeningSocket()) {
                        $connection = $socket->accept();
                        $socket->triggerOnNewConnect([
                            Event::PARAM_CONNECTION => $connection,
                            Event::PARAM_SERVER => $socket
                        ]);
                    } else {
                        $message = $socket->recv();
                        
                        if(strlen($message) > 0) {
                            $socket->triggerOnRecv([Event::PARAM_MESSAGE => $message]);
                        }
                    }
    
                    if($socket->getLastReturn() === 0) {
                        $socket->close();
                        continue;
                    }
                }
            }
            
            foreach ($writeables as $originalSocket) {
                $socket = static::findSocket($originalSocket);
    
                if($socket) {
                    if(!$socket->available()) {
                        $socket->closeWithoutEvent();
                        $socket->triggerOnConnectFail([
                            Event::PARAM_ERROR_MESSAGE => $socket->getLastErrorMessage(),
                            Event::PARAM_ERROR_CODE => $socket->getLastError()
                        ]);
                    } else if($socket->getpeername()) {
                        $socket->triggerOnConnect();
                        static::addReadableSocketListen($socket);
                    } else {
                        $socket->closeWithoutEvent();
                        $socket->triggerOnConnectFail([
                            Event::PARAM_ERROR_MESSAGE => $socket->getLastErrorMessage(),
                            Event::PARAM_ERROR_CODE => $socket->getLastError()
                        ]);
                    }
                   
                    static::removeFromWriteableSocketListens($socket);
                }
            }
        };
        
        while(true) $dispatch();
    }
    
    static public function getReadableSocketListens() {
        global $gReadableSocketListens;
        return empty($gReadableSocketListens) ? [] : $gReadableSocketListens;
    }
    
    static public function getWriteableSocketListens() {
        global $gWriteableSocketListens;
        return empty($gWriteableSocketListens) ? [] : $gWriteableSocketListens;
    }
    
    static public function removeFromReadableSocketListens($socket) {
        global $gReadableSocketListens;
        
        if(($index = self::searchArray($socket, $gReadableSocketListens)) !== false) {
            array_splice($gReadableSocketListens, $index, 1);
        }
    }
    
    static public function removeFromWriteableSocketListens($socket) {
        global $gWriteableSocketListens;
    
        if(($index = self::searchArray($socket, $gWriteableSocketListens)) !== false) {
            array_splice($gWriteableSocketListens, $index, 1);
        }
    }
    
    static public function addWriteableSocketListen(Connection $socket) {
        if(!$socket->available()) {
            return;
        }
        
        global $gWriteableSocketListens;
    
        if(self::searchArray($socket, $gWriteableSocketListens) === false) {
            $gWriteableSocketListens []= $socket;
        }
    }
    
    static public function addReadableSocketListen(Connection $socket) {
        if(!$socket->available()) {
            return;
        }
        
        global $gReadableSocketListens;
        
        if(self::searchArray($socket, $gReadableSocketListens) === false) {
            $gReadableSocketListens []= $socket;
        }
    }
    
    static public function searchArray($value, $array) {
        foreach ($array as $i => $row) {
            if($row === $value) {
                return $i;
            }
        }
        
        return false;
    }
    
    /**
     * @param $originalSocket
     * @return mixed|Connection|null
     */
    static public function findSocket($originalSocket) {
        global $gReadableSocketListens, $gWriteableSocketListens;
        $ary = array_merge([], $gReadableSocketListens, $gWriteableSocketListens);
        
        /** @var Connection $item */
        foreach($ary as $item) {
            if($item->getOriginalSocket() === $originalSocket) {
                return $item;
            }
        }
        
        return null;
    }
    
    static public function queryIp($domain) {
        if(preg_match('#^([0-9]{1,3}\.){3}[0-9]{1,3}$#', $domain)) {
            return $domain;
        }
        
        $ip = gethostbyname($domain);
        
        if($ip == $domain) {
            return null;
        }
        
        return static::queryIp($ip);
    }
    
    public function triggerOn($type, $params=[]) {
        $params[Event::PARAM_CONNECTION] = $params[Event::PARAM_CONNECTION] ?: $this;
        
        if($type == self::EVENT_TYPE_ERROR) {
            $params[Event::PARAM_ERROR_CODE] = $params[Event::PARAM_ERROR_CODE] ?: $this->getLastError();
            $params[Event::PARAM_ERROR_MESSAGE] = $params[Event::PARAM_ERROR_MESSAGE] ?: $this->getLastErrorMessage();
            
            $traces = debug_backtrace();
            array_shift($traces);
            $traces = array_map(function($row) {
                unset($row['object']);
                return $row;
            }, $traces);
            
            $params[Event::PARAM_ERROR_TRACES] = $traces;
        }
        
        return $this->_triggerOn($type, $params);
    }
    
    public function triggerBefore($type, $params=[]) {
        $params[Event::PARAM_CONNECTION] = $params[Event::PARAM_CONNECTION] ?: $this;
        return $this->_triggerBefore($type, $params);
    }
    
    public function onClose($callback) {
        return $this->on(self::EVENT_TYPE_CLOSE, $callback);
    }
    
    public function beforeClose($callback) {
        return $this->before(self::EVENT_TYPE_CLOSE, $callback);
    }
    
    public function onConnect($callback) {
        return $this->on(self::EVENT_TYPE_CONNECT, $callback);
    }
    
    public function beforeConnect($callback) {
        return $this->before(self::EVENT_TYPE_CONNECT, $callback);
    }
    
    public function onConnectFail($callback) {
        return $this->on(self::EVENT_TYPE_CONNECT_FAIL, $callback);
    }
    
    public function onError($callback) {
        return $this->on(self::EVENT_TYPE_ERROR, $callback);
    }
    
    public function onRecv($callback) {
        return $this->on(self::EVENT_TYPE_RECV, $callback);
    }
    
    public function onNewConnect($callback) {
        return $this->on(self::EVENT_TYPE_NEW_CONNECT, $callback);
    }
    
    public function onReady($callback) {
        return $this->on(self::EVENT_TYPE_READY, $callback);
    }
    
    
    public function triggerOnClose($params=[]) {
        return $this->triggerOn(self::EVENT_TYPE_CLOSE, $params);
    }
    
    public function triggerBeforeClose($params=[]) {
        return $this->triggerBefore(self::EVENT_TYPE_CLOSE, $params);
    }
    
    public function triggerOnConnect($params=[]) {
        return $this->triggerOn(self::EVENT_TYPE_CONNECT, $params);
    }
    
    public function triggerBeforeConnect($params=[]) {
        return $this->triggerBefore(self::EVENT_TYPE_CONNECT, $params);
    }
    
    public function triggerOnConnectFail($params=[]) {
        return $this->triggerOn(self::EVENT_TYPE_CONNECT_FAIL, $params);
    }
    
    public function triggerOnError($params=[]) {
        return $this->triggerOn(self::EVENT_TYPE_ERROR, $params);
    }
    
    public function triggerOnRecv($params=[]) {
        return $this->triggerOn(self::EVENT_TYPE_RECV, $params);
    }
    
    public function triggerOnNewConnect($params=[]) {
        return $this->triggerOn(self::EVENT_TYPE_NEW_CONNECT, $params);
    }
    
    public function triggerOnReady($params=[]) {
        return $this->triggerOn(self::EVENT_TYPE_READY, $params);
    }
    
}